# NameServer 源码分析
作者:Ethan.Yang
博客:https://blog.ethanyang.cn (opens new window)
# 1. NameServer 整体流程

RocketMQ 中,NameServer 是一个无状态、轻量级的路由服务组件,可以看作轻量级的注册中心, 主要负责:
- 维护 Topic → Broker 的路由信息
- 接收 Broker 注册 / 心跳
- 为 Producer / Consumer 提供路由查询能力
从宏观上看,NameServer 的核心流程可以拆成 3 个阶段:启动 → 路由注册 → 路由剔除。
# 1.1 NameServer 启动
NameServer 启动后主要完成:
- 初始化内存路由表结构(
RouteInfoManager中的各类 Map) - 启动 Netty 网络服务端,监听指定端口(默认 9876)
- 注册请求处理器,接受 Broker 注册、心跳以及 Producer / Consumer 的路由查询请求
- 开启定时任务,定期扫描、剔除不活跃的 Broker
此时 NameServer 自身是无状态路由节点,所有路由数据都仅存储在内存中,不做持久化。
# 1.2 路由注册
Broker 启动后会向 所有 NameServer 周期性发送:
- 注册(
REGISTER_BROKER)请求 - 心跳(
HEART_BEAT)请求
上报的核心内容包括:
- Broker 集群名称(
clusterName) - Broker 名称(
brokerName)与主从角色(brokerId) - Topic 配置(
TopicConfig列表) - Broker 地址(IP + Port)
- FilterServer 列表(如果有)
NameServer 收到注册 / 心跳请求后会更新内存中的多张路由表,包括但不限于:
- Topic → Queue 信息(
topicQueueTable) - BrokerName → BrokerData(
brokerAddrTable) - ClusterName → BrokerName 集合(
clusterAddrTable) - BrokerAddr → 存活信息(
brokerLiveTable)
Producer、Consumer 查询路由时,即是基于这些内存结构来获取可用 Broker 地址和队列信息。
# 1.3 路由剔除
NameServer 与每台 Broker 之间保持长连接,并通过定时任务完成 Broker 存活检查:
- 每 10 秒 扫描一次
brokerLiveTable - 如果某个 Broker 超过 120 秒 未收到心跳,判定为下线 / 失效
当判定 Broker 失效时,NameServer 会:
- 关闭与该 Broker 的 Netty Channel
- 从
brokerLiveTable中移除该 Broker - 从
topicQueueTable中剔除该 Broker 下的所有队列 - 从
brokerAddrTable、clusterAddrTable等结构中移除其相关数据
之后 Producer / Consumer 下一次从 NameServer 拉取路由时,就不会再拿到该失效 Broker 的地址,从而实现集群的动态感知与高可用。
# 2. NameServer 启动流程源码
NameServer 是一个独立的进程,核心入口在 NamesrvStartup,最终构建并启动 NamesrvController。
NamesrvStartup.main()
└─> main0()
└─> createNamesrvController()
└─> controller.initialize()
└─> controller.start()
2
3
4
5

# 2.1 加载KV配置
NamesrvStartup#createNamesrvController 负责解析启动参数、加载配置文件并创建 NamesrvController:
public static NamesrvController createNamesrvController(String[] args)
throws IOException, JoranException {
// 省略参数解析相关代码...
// 1. 创建 NamesrvConfig(NameServer 自身配置)
final NamesrvConfig namesrvConfig = new NamesrvConfig();
// 2. 创建 NettyServerConfig(网络层配置)
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// 默认监听端口
nettyServerConfig.setListenPort(9876);
// 3. 解析 -c 参数:指定外部配置文件
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
Properties properties = new Properties();
properties.load(in);
// 将 Properties 中的配置映射到对象
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
// 4. 解析 -p 参数:打印当前配置并退出(调试用)
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
// 5. 启动参数覆盖配置文件(优先级更高)
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
// 6. 创建 NamesrvController
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
return controller;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
几点小结:
-c:指定配置文件路径,加载后映射到NamesrvConfig与NettyServerConfig-p:打印当前最终配置(包括默认值 + 配置文件 + 命令行),然后退出,一般用于排查配置问题- 命令行参数最终会覆盖配置文件中的同名配置项
# 2.2 构建 NettyRemotingServer(接收路由 / 心跳)
// NamesrvController#initialize()
public boolean initialize() {
// ... 其他初始化逻辑
// 创建 NettyRemotingServer,负责网络收发
this.remotingServer = new NettyRemotingServer(
this.nettyServerConfig, this.brokerHousekeepingService);
// 注册请求处理器等
this.registerProcessor();
// ... 其他初始化逻辑
return true;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
NameServer 启动时会调用:
// NamesrvController#start()
public void start() throws Exception {
// 启动 Netty 服务,正式对外提供路由/注册/心跳服务
this.remotingServer.start();
}
2
3
4
5
# 2.3 定时任务剔除超时 Broker
NamesrvController#initialize() 中还会启动一个定时任务,用于扫描并移除不活跃的 Broker:
public boolean initialize() {
// ...
// 业务处理线程池(用于处理网络请求)
this.remotingExecutor = Executors.newFixedThreadPool(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor();
// 每隔 10s 扫描一次 Broker 存活状态
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// ...
return true;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
路由剔除核心逻辑在 RouteInfoManager#scanNotActiveBroker():
// Broker 连接超时时间(默认 2 分钟)
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
public void scanNotActiveBroker() {
Iterator<Entry<String, BrokerLiveInfo>> it =
this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
// 若上次心跳时间 + 2min < 当前时间,则认为该 Broker 已失效
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
// 关闭网络连接
RemotingUtil.closeChannel(next.getValue().getChannel());
// 从 brokerLiveTable 中移除
it.remove();
log.warn("The broker channel expired, {} {}ms",
next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
// 进一步从路由表中剔除该 Broker(topicQueueTable、brokerAddrTable、clusterAddrTable 等)
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
时间点对照:
- Broker:默认 每 30 秒 向所有 NameServer 发送一次心跳
- NameServer:每 10 秒 扫描一次
brokerLiveTable- 超过 120 秒未收到心跳 → 判定 Broker 失效,进行路由剔除
这会导致一个设计上的“短暂不一致”:
- NameServer 认为 Broker 仍然存活(尚未超过 120s)
- 但 Broker 可能已经宕机
- 此时 Producer / Consumer 仍可能拿到已下线 Broker 的地址
RocketMQ 的处理方式是:将故障规避策略放在 Producer / Consumer 端,例如:
- 发送异常后重试其他队列 / Broker
- 消费失败后重试 / 重新拉取
NameServer 因此可以做到非常轻量、无状态、易扩展,这也是它的设计理念:简单、快速,复杂性放在业务端去兜底。

路由信息的删除有两个触发点:
- 被动删除:定时扫描发现 Broker 超时(2min 无心跳)
- 主动删除:Broker 正常关闭时,会主动发送
UNREGISTER_BROKER请求,通知 NameServer 移除路由
两者最终都会调用同一套“路由删除逻辑”,从多张路由表中删除与该 Broker 相关的全部信息。
# 3 NameServer设计亮点
# 3.1 读写锁:读多写少的高并发优化
RouteInfoManager 中使用了 ReentrantReadWriteLock 读写锁来保护路由表的并发访问:
public class RouteInfoManager {
// 适用于“读多写少”场景的读写锁
private final ReadWriteLock lock = new ReentrantReadWriteLock();
// topic 对应队列信息
private final HashMap<String /* topic */, List<QueueData>> topicQueueTable;
// broker 基础信息
private final HashMap<String /* brokerName */, BrokerData> brokerAddrTable;
// 集群名 -> BrokerName 集合
private final HashMap<String /* clusterName */, Set<String /* brokerName */>> clusterAddrTable;
// brokerAddr -> 存活信息(心跳)
private final HashMap<String /* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// brokerAddr -> FilterServer 列表
private final HashMap<String /* brokerAddr */, List<String> /* Filter Server */> filterServerTable;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
典型的读写场景:
生产者发送消息时:频繁读取路由信息
public byte[] getAllTopicList() { TopicList topicList = new TopicList(); try { this.lock.readLock().lockInterruptibly(); try { topicList.getTopicList().addAll(this.topicQueueTable.keySet()); } finally { this.lock.readLock().unlock(); } } catch (Exception e) { log.error("getAllTopicList Exception", e); } return topicList.encode(); }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15Broker 心跳 / 注册时:更新路由信息(写)
public RegisterBrokerResult registerBroker( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel) { RegisterBrokerResult result = new RegisterBrokerResult(); try { this.lock.writeLock().lockInterruptibly(); try { // 维护 clusterAddrTable Set<String> brokerNames = this.clusterAddrTable.get(clusterName); if (brokerNames == null) { brokerNames = new HashSet<String>(); this.clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName); boolean registerFirst = false; // 维护 brokerAddrTable BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (brokerData == null) { registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>()); this.brokerAddrTable.put(brokerName, brokerData); } Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs(); // ... 省略 master/slave 处理逻辑 } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("registerBroker Exception", e); } return result; }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
总结一下:
synchronized/ReentrantLock是排他锁,同一时刻只有一个线程可进入ReentrantReadWriteLock则将锁拆分为读锁 / 写锁:- 多个读线程可以并发读
- 写线程需要独占,会阻塞读 / 写线程
NameServer 的路由表属于典型的“读远多于写”的场景:
- Producer / Consumer 不断查询路由(读多)
- Broker 心跳 / 注册才会触发写,频率相对较低(写少)
因此使用读写锁可以有效提升并发性能。
# 3.2 路由信息全部基于内存,不做持久化
NameServer 内部通过几张 HashMap 来管理所有路由信息:
topicQueueTable: Topic → 队列信息,用于 Producer 选择队列时做负载均衡brokerAddrTable: BrokerName →BrokerData(含 clusterName、master/slave 地址等)clusterAddrTable: ClusterName → BrokerName 集合brokerLiveTable: BrokerAddr →BrokerLiveInfo(上次心跳时间、Netty Channel 等)filterServerTable: BrokerAddr → FilterServer 列表(用于类模式消息过滤)
NameServer 不对这些信息做任何落盘操作:
- 路由信息只存在于内存中
- Broker 重启后会重新向所有 NameServer 注册
- NameServer 挂了重启,也不会影响整个集群的正确性
持久化的责任完全交给 Broker,NameServer 只关注“当前可用路由”,因此实现非常轻量,吞吐也非常高。
# 3.3 NameServer 无状态化与多机房容灾

几个关键特性:
- NameServer 之间不通信、不做数据同步
- 每个 Broker 会向所有 NameServer 注册 / 发送心跳
- Producer / Consumer 与集群中任意一台 NameServer 建立长连接即可
多机房部署示例:
- 假设一个 RocketMQ 集群跨两个机房部署,每个机房都有若干 NameServer、Broker、客户端
- 当两个机房之间网络链路中断时:
- 各自机房内的 NameServer 仍然可用
- NameServer 只会保留本机房仍有心跳的 Broker
- 客户端在本机房内访问本机房 NameServer,只能拿到本机房内可达的 Broker 路由
这意味着:
- 网络分区不会影响 NameServer 自身的可用性(反正它们不互相同步)
- NameServer 只基于是否能收到心跳来判断 Broker 是否可用
- 对于跨机房的 Broker,只要网络断了,就会被各自机房的 NameServer 剔除出路由信息
整体上,RocketMQ 通过 NameServer 的无状态 + 全内存设计,换来了:
- 极高的路由读写性能
- 非常简单的水平扩展与多机房容灾能力
- 清晰的职责边界(路由中心 ≠ 元数据中心)
← 源码分析准备 Broker 源码分析 →